package cn.turing.firecontrol.exchange.service.impl;

import cn.turing.firecontrol.exchange.utils.DBHelper;
import org.apache.http.HttpHost;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;


/**
 * @author Richardwoo
 * @time 2021-5-15
 *
 *类说明：通过BulkProcess批量将Mysql数据导入ElasticSearch中
 */

public class BulkProcessImpl {
    private static RestHighLevelClient client;
    Connection conn =null;
    PreparedStatement ps = null;
    ResultSet rs = null;
    static {
        client = new RestHighLevelClient(
                RestClient.builder(
                        new HttpHost("192.168.19.130", 9200, "http")
                ));
    }
    public static boolean indexExists(String index) throws Exception{
        GetIndexRequest getIndexRequest=new GetIndexRequest();
        getIndexRequest.indices(index);
        return client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
    }
    public static boolean createIndex(String indexName) throws Exception {
        if(indexExists(indexName))
            return true;
        CreateIndexRequest requestIndex = new CreateIndexRequest(indexName.toLowerCase());// 创建索引
        // 创建的每个索引都可以有与之关联的特定设置。设置副本数与刷新时间对于索引数据效率有不小的提升
        requestIndex.settings(Settings.builder()
                .put("index.number_of_shards", 5)
                .put("index.number_of_replicas", 2)
                .put("index.refresh_interval", "-1"));
        CreateIndexResponse createIndexResponse = client.indices().create(requestIndex, RequestOptions.DEFAULT);
        return createIndexResponse.isAcknowledged();
    }
    /**
     * 将mysql 数据查出组装成es需要的map格式，通过批量写入es中
     *
     * @param sql
     */
    public ResultSet getResultSet(String sql) throws Exception {
        conn =DBHelper.getConn();
        ps = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
        ps.setFetchSize(Integer.MIN_VALUE);
        rs = ps.executeQuery();
         return rs;
    }
    /**
     * 将mysql 数据查出组装成es需要的map格式，通过批量写入es中
     *
     * @param tableName
     */
    public void writeMysqlDataToES(String tableName,String indexName,String type,String idName) throws Exception{
        //初始化
        RestHighLevelClient client = new RestHighLevelClient(RestClient.builder(
                new HttpHost("192.168.19.130", 9200, "http")
        ));
        //index如果不存在就创建
        createIndex(indexName);
        BulkProcessor bulkProcessor = getBulkProcessor(client);
        try {
            String sql = "SELECT * from " + tableName;
            ResultSet rs =getResultSet(sql);
            ResultSetMetaData colData = rs.getMetaData();

            ArrayList<HashMap<String, String>> dataList = new ArrayList<HashMap<String, String>>();

            HashMap<String, String> map = null;
            int count = 0;
            String col = null;
            String val = null;
            while (rs.next()) {
                count++;
                map = new HashMap<String, String>(100);
                for (int i = 1; i <= colData.getColumnCount(); i++) {
                    col = colData.getColumnName(i);
                    val = rs.getString(col);
                    map.put(col, val);
                }
                dataList.add(map);
                // 每20万条写一次，不足的批次的最后再一并提交
                if (count % 200000 == 0) {
                    // 写入ES
                    for (HashMap<String, String> hashMap2 : dataList) {

                        bulkProcessor.add(new IndexRequest(indexName, type, hashMap2.get(idName))
                                .source(hashMap2));
                    }
                    // 每提交一次便将map与list清空
                    map.clear();
                    dataList.clear();
                }
            }

            // count % 200000 处理未提交的数据
            for (HashMap<String, String> hashMap2 : dataList) {
                bulkProcessor.add(
                        new IndexRequest(tableName.toLowerCase(), "gzdc", hashMap2.get("S_GUID")).source(hashMap2));
            }
            // 将数据刷新到es, 注意这一步执行后并不会立即生效，取决于bulkProcessor设置的刷新时间
            bulkProcessor.flush();

        }finally {
            if(rs!=null)
                  rs.close();
            if(ps!=null)
                ps.close();
            if(conn!=null)
                conn.close();
            boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS);
            if(client!=null)
                client.close();
        }
    }
    
    /**
     * 获得批进程
     * 参数设置
     *    bulkActions          10000
     *    bulkSize             10MByte
     *    concurrentRequests   10个同步请求
     *    flushInterval        100秒刷新一次
     *    backoffPolicy        退避策略采用超时1秒退出
     * @param
     */
    private static BulkProcessor getBulkProcessor(RestHighLevelClient client) throws Exception {
        BulkProcessor bulkProcessor = null;
        try {

            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    request.numberOfActions();
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {

                    System.out.println("executionId:"+executionId+"  "+request.numberOfActions());
                }

                @Override
                public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

                    System.out.println("executionId:"+executionId+"  "+failure.getMessage());
                }
            };

            BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client
                    .bulkAsync(request, RequestOptions.DEFAULT, bulkListener);

            BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
            builder.setBulkActions(10000);
            builder.setBulkSize(new ByteSizeValue(300L, ByteSizeUnit.MB));
            builder.setConcurrentRequests(10);
            builder.setFlushInterval(TimeValue.timeValueSeconds(100L));
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
            //添加本句,否则上面对builder设置的属性没有生效
            bulkProcessor = builder.build();

        } catch (Exception e) {
            e.printStackTrace();
            bulkProcessor.awaitClose(100L, TimeUnit.SECONDS);
            client.close();
        }
        return bulkProcessor;
    }
    private static void clientClose() throws Exception{
        client.close();
    }
}

